Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-5696] Have a deduplicating job worker #18

Open
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

skykanin
Copy link
Contributor

@skykanin skykanin commented Apr 11, 2023

Implement duplicating job worker option in consumers

@skykanin skykanin marked this pull request as ready for review April 21, 2023 12:28
Copy link
Contributor

@jsynacek jsynacek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is a public library. I'd say we shouldn't use Jira references here. It's better to link from the internal sources to the public ones.

.gitignore Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
src/Database/PostgreSQL/Consumers/Config.hs Outdated Show resolved Hide resolved
@skykanin skykanin requested a review from jsynacek April 26, 2023 08:55
Copy link
Contributor

@jsynacek jsynacek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, apart from the .gitignore changes.

You might still want to wait for the final review from @arybczak , because I don't really know the code that well.

@arybczak
Copy link
Collaborator

arybczak commented Apr 27, 2023

I'd like to not merge this until @matobet adjusts his PR using this one to (double) verify that it works as expected.

]
where
operator = case ccMode of
Standard -> "="
Duplicating _field -> "<="
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to change the type signature of updateJobs so that it takes only a single (idx, Result) in the deduplicating case 🤔 The problem now is that if something goes awry and multiple ids are passed here, the condition id <= ANY (...) will wreak havoc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how were tests passing with this bug? :) They should be updated accordingly.

Ensure that only one job is processed at a time in deduplicating
mode so that we don't accidentally update multiple jobs using the
<= operator when checking for matching ids to update rows in the
database table.

-- | Result of processing a job.
data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in.
data Mode = Standard | Duplicating (RawSQL ())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow deduplicating on the primary key row of the jobs table like ccMode = Duplicating "id"? If you do this right now you get an ambiguity error in the sql query used for reserving jobs in the consumer because part of the query used in reserveJobs becomes SELECT id, id ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approach would be not to do that if it's not necessary for proper function now. It can be added later if there's a need for it. And maybe document it somewhere that you can't deduplicate based on fields that are called id.

@@ -269,7 +273,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore

return (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m (Either job [job], Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Either is quite confusing. Could you add a comment about the idea behind it so that people don't have to figure out what it's supposed to mean? I wonder if a simple data type wouldn't be better here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by "better" I mean more readable...


-- | Result of processing a job.
data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in.
data Mode = Standard | Duplicating (RawSQL ())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approach would be not to do that if it's not necessary for proper function now. It can be added later if there's a need for it. And maybe document it somewhere that you can't deduplicate based on fields that are called id.

test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved
test/Test.hs Outdated Show resolved Hide resolved

-- | Result of processing a job.
data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in.
data Mode = Standard | Duplicating (RawSQL ())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating should probably be a non-empty array of expressions as one may want to be able to de-duplicate on more than one expression. And the SQL expression type should be just SQL and not RawSQL ().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RawSQL () is fine, it's for "sql literals", i.e. values that can't hold parameters.

, " ORDER BY run_at," <+> raw field <> ", id DESC LIMIT 1 FOR UPDATE SKIP LOCKED),"
, " lock_all AS"
, " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable
, " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this almost entirely ignores run_at column? There's no run_at <= <?> now, so this would just process any job, even ones scheduled in the future, but even if the conditions was set, the de-duplicating job worker would not be very efficient at de-duplicating if jobs were scheduled into the future or when ccNotificationChannel is set.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The mode should lock the group of jobs with the same deduplication id (dedId) that are scheduled to be processed with run_at <= now. If there are other jobs with this dedId scheduled for the future, they should be left alone.

The other problem here is not looking at the reserved_by column. However, introduction of reserved_by check like in the standard case doesn't fully solve the issue because even if a job is still being processed, there might be another row inserted after it started with the same dedId. And now it will be started in parallel to the old one and there's going to be a race :/

Copy link
Contributor

@marco44 marco44 Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sorry, I feel like i'm arriving after the party…
so for what i see:

  • nothing is preventing us from having 2 sessions doing this at almost the same time and and both thinking that they want to work on the same value of "raw field", which i suppose is the dedId mentioned elsewhere ?
  • if we want to be 100% sure we don't have 2 jobs working on the same dedid at the same time, there is a more direct approach: we just lock this dedid. either in memory using an advisory lock (like select pg_advisory_lock(hash(dedid)) or something like that, or we just create a table for this with this dedid as the unique column and PK. When you want to work on a dedid, you insert a record there. when you have finished, you delete it and commit. noone will be able to work on it in the meantime. advisory locks are probably better here… you can tie them to a transaction or not (better in case you want them to be freed on error for instance), and you have the "try" function variants. So maybe it would be simpler to just:
  • find a candidate, get its deduplication id (the select for update skip lock should help us parallelize it), and try locking this dedid in shared memory. then the rest probably becomes simpler and more error proof ?

BTW, maybe i misunderstood how this work, i didn't look at the haskell code all around

src/Database/PostgreSQL/Consumers/Config.hs Show resolved Hide resolved
, " ORDER BY run_at," <+> raw field <> ", id DESC LIMIT 1 FOR UPDATE SKIP LOCKED),"
, " lock_all AS"
, " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable
, " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The mode should lock the group of jobs with the same deduplication id (dedId) that are scheduled to be processed with run_at <= now. If there are other jobs with this dedId scheduled for the future, they should be left alone.

The other problem here is not looking at the reserved_by column. However, introduction of reserved_by check like in the standard case doesn't fully solve the issue because even if a job is still being processed, there might be another row inserted after it started with the same dedId. And now it will be started in parallel to the old one and there's going to be a race :/

-- on which @'ccMode'@ the consumer is running in.
limitJobs = case ccMode of
Standard -> Right
Duplicating _field -> Left . head
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right.

You're picking the first job from the list and assume that it was the one with the highest id later in updateJob, but why? The query above doesn't sort on the id field. But even if you take the highest one, it's not guaranteed that you want to update all jobs with a lower id later (once looking at run_at is fixed in the reservedJobs query).

Uhh, this looks to be more complicated than I first thought it will be (even more so considering my other comment below).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants